{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 一.维特比算法\n",
    "隐状态预测是指在$P(O\\mid\\lambda)$给定的情况下，求一个隐状态序列$I$，使得$P(O,I\\mid\\lambda)$最大，比如下图，有三种隐状态，求在$O=(o_1,o_2,o_3,o_4)$的情况下，选择一条最优路径使得$P(O,I\\mid\\lambda)$最大，假设为图中的红色路径，那么最优路径为$i_1=q_1,i_2=q_2,i_3=q_1,i_4=q_3$\n",
    "![avatar](./source/12_HMM维特比1.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "那如何求最优路径呢？下面这句话画个重点：    \n",
    "\n",
    "**如果求得$i_1\\rightarrow i_4$的最优路径为$i_1^*\\rightarrow i_4^*$，那么$i_3^*\\rightarrow i_4^*$必然为$i_3\\rightarrow i_4$的最优路径** \n",
    "\n",
    "将时刻$t$状态为$i$的所有路径中的概率最大值表示为$\\delta_t(i)$，那么在$\\delta_3(1),\\delta_3(2),\\delta_3(3)$已知的情况下，要求$\\delta_4(\\cdot)$，我们只需在所有$i_3\\rightarrow i_4$的9条路径中找出最优的一条即可，即这时可以将前面的看做黑箱即可\n",
    "![avatar](./source/12_HMM维特比2.png)   \n",
    "\n",
    "在$i_3\\rightarrow i_4$这一步，我们是假设$\\delta_3(\\cdot)$已知了，那如何求解$\\delta_3(\\cdot)$呢？我们继续往前在$i_2\\rightarrow i_3$的9条路径中选择即可，即：   \n",
    "\n",
    "![avatar](./source/12_HMM维特比3.png)    \n",
    "\n",
    "所以，递推关系就出来了：   \n",
    "\n",
    "$$\n",
    "\\delta_t(i)=\\max_{1\\leq j\\leq N}[\\delta_{t-1}(j)a_{ji}]b_i(o_t),i=1,2,..,N\n",
    "$$  \n",
    "\n",
    "由于我们需要求得各节点的状态序列，所以还需要去保存我们的路径状态，从上面的递推过程也可以看出，我们在第$i_4$步的时候就可以确定使其最概率最大的第$i_3$步的取值了，所以我们做一个定义，在时刻$t$状态为$i$的所有单个路径$(i_1,i_2,...,i_{t-1},i_t)$中概率最大的路径的第$t-1$个节点为：      \n",
    "\n",
    "$$\n",
    "\\psi_t(i)=arg\\max_{1\\leq j\\leq N}[\\delta_{t-1}(j)a_{ji}],i=1,2,...,N\n",
    "$$  \n",
    "\n",
    "接下来叙述完整的流程   "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 算法流程\n",
    "已知模型参数$\\lambda=(A,B,\\pi)$和观测数据$O=(o_1,o_2,...,o_T)$：   \n",
    "\n",
    ">（1）初始化   \n",
    "\n",
    "$$\n",
    "\\delta_1(i)=\\pi_ib_i(o_1),i=1,2,...,N\\\\\n",
    "\\psi_t(i)=0,i=1,2,...,N\n",
    "$$   \n",
    "\n",
    ">（2）递推，对$t=2,3,...,T$   \n",
    "\n",
    "$$\n",
    "\\delta_t(i)=\\max_{1\\leq j\\leq N}[\\delta_{t-1}(j)a_{ji}]b_i(o_t),i=1,2,..,N\\\\\n",
    "\\psi_t(i)=arg\\max_{1\\leq j\\leq N}[\\delta_{t-1}(j)a_{ji}],i=1,2,...,N\n",
    "$$    \n",
    "\n",
    ">（3）终止   \n",
    "\n",
    "$$\n",
    "P^*=\\max_{1\\leq i\\leq N}\\delta_T(i)\\\\\n",
    "i_T^*=arg\\max_{1\\leq i\\leq N}[\\delta_T(i)]\n",
    "$$   \n",
    "\n",
    ">（4）回溯最优路径，对$t=T-1,T-2,...,1$   \n",
    "\n",
    "$$\n",
    "i_t^*=\\psi_{t+1}(i_{t+1}^*)\n",
    "$$   返回最优路径$I^*=(i_1^*,i_2^*,...,i_T^*)$"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 二.代码实现\n",
    "继续在上一节的代码上添加..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "\n",
    "\"\"\"\n",
    "隐马尔科夫模型实现，封装到ml_models.pgm\n",
    "\"\"\"\n",
    "\n",
    "class HMM(object):\n",
    "    def __init__(self, hidden_status_num=None, visible_status_num=None):\n",
    "        \"\"\"\n",
    "        :param hidden_status_num: 隐状态数\n",
    "        :param visible_status_num: 观测状态数\n",
    "        \"\"\"\n",
    "        self.hidden_status_num = hidden_status_num\n",
    "        self.visible_status_num = visible_status_num\n",
    "        # 定义HMM的参数\n",
    "        self.pi = None  # 初始隐状态概率分布 shape:[hidden_status_num,1]\n",
    "        self.A = None  # 隐状态转移概率矩阵 shape:[hidden_status_num,hidden_status_num]\n",
    "        self.B = None  # 观测状态概率矩阵 shape:[hidden_status_num,visible_status_num]\n",
    "\n",
    "    def predict_joint_visible_prob(self, visible_list=None, forward_type=\"forward\"):\n",
    "        \"\"\"\n",
    "        前向/后向算法计算观测序列出现的概率值\n",
    "        :param visible_list:\n",
    "        :param forward_type:forward前向，backward后向\n",
    "        :return:\n",
    "        \"\"\"\n",
    "        if forward_type == \"forward\":\n",
    "            # 计算初始值\n",
    "            alpha = self.pi * self.B[:, [visible_list[0]]]\n",
    "            # 递推\n",
    "            for step in range(1, len(visible_list)):\n",
    "                alpha = self.A.T.dot(alpha) * self.B[:, [visible_list[step]]]\n",
    "            # 求和\n",
    "            return np.sum(alpha)\n",
    "        else:\n",
    "            # 计算初始值\n",
    "            beta = np.ones_like(self.pi)\n",
    "            # 递推\n",
    "            for step in range(len(visible_list) - 2, -1, -1):\n",
    "                beta = self.A.dot(self.B[:, [visible_list[step + 1]]] * beta)\n",
    "            # 最后一步\n",
    "            return np.sum(self.pi * self.B[:, [visible_list[0]]] * beta)\n",
    "\n",
    "    def fit_with_hidden_status(self, visible_list, hidden_list):\n",
    "        \"\"\"\n",
    "        包含隐状态的参数估计\n",
    "        :param visible_list: [[],[],...,[]]\n",
    "        :param hidden_list: [[],[],...,[]]\n",
    "        :return:\n",
    "        \"\"\"\n",
    "        self.pi = np.zeros(shape=(self.hidden_status_num, 1))\n",
    "        self.A = np.zeros(shape=(self.hidden_status_num, self.hidden_status_num))\n",
    "        self.B = np.zeros(shape=(self.hidden_status_num, self.visible_status_num))\n",
    "        for i in range(0, len(visible_list)):\n",
    "            visible_status = visible_list[i]\n",
    "            hidden_status = hidden_list[i]\n",
    "            self.pi[hidden_status[0]] += 1\n",
    "            for j in range(0, len(hidden_status) - 1):\n",
    "                self.A[hidden_status[j], hidden_status[j + 1]] += 1\n",
    "                self.B[hidden_status[j], visible_status[j]] += 1\n",
    "            self.B[hidden_status[j + 1], visible_status[j + 1]] += 1\n",
    "        # 归一化\n",
    "        self.pi = self.pi / np.sum(self.pi)\n",
    "        self.A = self.A / np.sum(self.A, axis=0)\n",
    "        self.B = self.B / np.sum(self.B, axis=0)\n",
    "\n",
    "    def fit_without_hidden_status(self, visible_list=None, tol=1e-5, n_iter=10):\n",
    "        \"\"\"\n",
    "        不包含隐状态的参数估计:Baum-Welch算法\n",
    "        :param visible_list: [...]\n",
    "        :param tol:当pi,A,B的增益值变化小于tol时终止\n",
    "        :param n_iter:迭代次数\n",
    "        :return:\n",
    "        \"\"\"\n",
    "        # 初始化参数\n",
    "        if self.pi is None:\n",
    "            self.pi = np.ones(shape=(self.hidden_status_num, 1)) + np.random.random(size=(self.hidden_status_num, 1))\n",
    "            self.pi = self.pi / np.sum(self.pi)\n",
    "        if self.A is None:\n",
    "            self.A = np.ones(shape=(self.hidden_status_num, self.hidden_status_num)) + np.random.random(\n",
    "                size=(self.hidden_status_num, self.hidden_status_num))\n",
    "            self.A = self.A / np.sum(self.A, axis=0)\n",
    "        if self.B is None:\n",
    "            self.B = np.ones(shape=(self.hidden_status_num, self.visible_status_num)) + np.random.random(\n",
    "                size=(self.hidden_status_num, self.visible_status_num))\n",
    "            self.B = self.B / np.sum(self.B, axis=0)\n",
    "        for _ in range(0, n_iter):\n",
    "            # 计算前向概率\n",
    "            alphas = []\n",
    "            alpha = self.pi * self.B[:, [visible_list[0]]]\n",
    "            alphas.append(alpha)\n",
    "            for step in range(1, len(visible_list)):\n",
    "                alpha = self.A.T.dot(alpha) * self.B[:, [visible_list[step]]]\n",
    "                alphas.append(alpha)\n",
    "            # 计算后向概率\n",
    "            betas = []\n",
    "            beta = np.ones_like(self.pi)\n",
    "            betas.append(beta)\n",
    "            for step in range(len(visible_list) - 2, -1, -1):\n",
    "                beta = self.A.dot(self.B[:, [visible_list[step + 1]]] * beta)\n",
    "                betas.append(beta)\n",
    "            betas.reverse()\n",
    "            # 计算gamma值\n",
    "            gammas = []\n",
    "            for i in range(0, len(alphas)):\n",
    "                gammas.append((alphas[i] * betas[i])[:, 0])\n",
    "            gammas = np.asarray(gammas)\n",
    "            # 计算xi值\n",
    "            xi = np.zeros_like(self.A)\n",
    "            for i in range(0, self.hidden_status_num):\n",
    "                for j in range(0, self.hidden_status_num):\n",
    "                    xi_i_j = 0.0\n",
    "                    for t in range(0, len(visible_list) - 1):\n",
    "                        xi_i_j += alphas[t][i][0] * self.A[i, j] * self.B[j, visible_list[t + 1]] * \\\n",
    "                                  betas[t + 1][j][0]\n",
    "                    xi[i, j] = xi_i_j\n",
    "            loss = 0.0  # 统计累计误差\n",
    "            # 更新参数\n",
    "            # 初始概率\n",
    "            for i in range(0, self.hidden_status_num):\n",
    "                new_pi_i = gammas[0][i]\n",
    "                loss += np.abs(new_pi_i - self.pi[i][0])\n",
    "                self.pi[i] = new_pi_i\n",
    "            # 隐状态转移概率\n",
    "            for i in range(0, self.hidden_status_num):\n",
    "                for j in range(0, self.hidden_status_num):\n",
    "                    new_a_i_j = xi[i, j] / np.sum(gammas[:, i][:-1])\n",
    "                    loss += np.abs(new_a_i_j - self.A[i, j])\n",
    "                    self.A[i, j] = new_a_i_j\n",
    "            # 观测概率矩阵\n",
    "            for j in range(0, self.hidden_status_num):\n",
    "                for k in range(0, self.visible_status_num):\n",
    "                    new_b_j_k = np.sum(gammas[:, j] * (np.asarray(visible_list) == k)) / np.sum(gammas[:, j])\n",
    "                    loss += np.abs(new_b_j_k - self.B[j, k])\n",
    "                    self.B[j, k] = new_b_j_k\n",
    "            # 归一化\n",
    "            self.pi = self.pi / np.sum(self.pi)\n",
    "            self.A = self.A / np.sum(self.A, axis=0)\n",
    "            self.B = self.B / np.sum(self.B, axis=0)\n",
    "            if loss < tol:\n",
    "                break\n",
    "\n",
    "    def predict_hidden_status(self, visible_list):\n",
    "        \"\"\"\n",
    "        维特比算法解码概率最大的隐状态\n",
    "        :param visible_list:\n",
    "        :return:\n",
    "        \"\"\"\n",
    "        # 初始化\n",
    "        delta = self.pi * self.B[:, [visible_list[0]]]\n",
    "        psi = [[0] * self.hidden_status_num]\n",
    "        # 递推\n",
    "        for visible_index in range(1, len(visible_list)):\n",
    "            new_delta = np.zeros_like(delta)\n",
    "            new_psi = []\n",
    "            # 当前节点\n",
    "            for i in range(0, self.hidden_status_num):\n",
    "                best_pre_index_i = -1\n",
    "                best_pre_index_value_i = 0\n",
    "                delta_i = 0\n",
    "                # 上一轮节点\n",
    "                for j in range(0, self.hidden_status_num):\n",
    "                    delta_i_j = delta[j][0] * self.A[j, i] * self.B[i, visible_list[visible_index]]\n",
    "                    if delta_i_j > delta_i:\n",
    "                        delta_i = delta_i_j\n",
    "                    best_pre_index_value_i_j = delta[j][0] * self.A[j, i]\n",
    "                    if best_pre_index_value_i_j > best_pre_index_value_i:\n",
    "                        best_pre_index_value_i = best_pre_index_value_i_j\n",
    "                        best_pre_index_i = j\n",
    "                new_delta[i, 0] = delta_i\n",
    "                new_psi.append(best_pre_index_i)\n",
    "            delta = new_delta\n",
    "            psi.append(new_psi)\n",
    "        # 回溯\n",
    "        best_hidden_status = [np.argmax(delta)]\n",
    "        for psi_index in range(len(visible_list) - 1, 0, -1):\n",
    "            next_status = psi[psi_index][best_hidden_status[-1]]\n",
    "            best_hidden_status.append(next_status)\n",
    "        best_hidden_status.reverse()\n",
    "        return best_hidden_status"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[2, 2, 2]\n"
     ]
    }
   ],
   "source": [
    "#用例10.3做测试\n",
    "pi = np.asarray([[0.2], [0.4], [0.4]])\n",
    "A = np.asarray([[0.5, 0.2, 0.3],\n",
    "                [0.3, 0.5, 0.2],\n",
    "                [0.2, 0.3, 0.5]])\n",
    "B = np.asarray([[0.5, 0.5],\n",
    "                [0.4, 0.6],\n",
    "                [0.7, 0.3]])\n",
    "\n",
    "hmm = HMM(hidden_status_num=3, visible_status_num=2)\n",
    "hmm.pi = pi\n",
    "hmm.A = A\n",
    "hmm.B = B\n",
    "print(hmm.predict_hidden_status([0, 1, 0]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
